Skip to content

feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a)#721

Merged
bootjp merged 2 commits intomainfrom
feat/sqs-htfifo-capability-poller
Apr 30, 2026
Merged

feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a)#721
bootjp merged 2 commits intomainfrom
feat/sqs-htfifo-capability-poller

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented Apr 30, 2026

Summary

Phase 3.D PR 4-B-3a — adds the stateless htfifo capability poller that PR 5's CreateQueue gate consumes. Stacks on the now-merged #715 (PR 4-B-2, partition resolver). This PR is purely additive: new helper file, new test file, no existing code touched. Next is PR 4-B-3b (leadership-refusal hook + flag flip).

What's added

  • adapter/sqs_capability_poller.go:
    • HTFIFOCapabilityReport{AllAdvertise, Peers} — binary go/no-go signal + per-peer detail for operator triage.
    • HTFIFOCapabilityPeerStatus{Address, HasHTFIFO, Capabilities, Error} — one peer's polling result.
    • PollSQSHTFIFOCapability(ctx, client, peers) — concurrent goroutine-per-peer poll, indexed-channel result aggregation (race-free).
    • Per-peer timeout defaultSQSCapabilityPollTimeout = 3s so a single hung peer can't stall the cluster-wide poll.
    • Body capped at 1 KiB via io.LimitReader so a misconfigured peer can't drain memory.
    • Bare host:port and full http://… / https://… URLs both accepted.
    • Fail-closed on every failure mode: timeout, transport error, non-200, malformed JSON, missing capability. Empty peer list → vacuously AllAdvertise=true (caller validates list completeness).

What's NOT added (deferred)

  • htfifoCapabilityAdvertised stays false. PR 4-B-3b adds the §8 leadership-refusal hook + per-acquisition observer in kv/raftengine/etcd and flips the flag.
  • CreateQueue does NOT yet call this helper. PR 5 lifts the PartitionCount > 1 dormancy gate AND wires the capability check in the same commit (per the §11 rollout plan's "gate-and-lift atomically" rule).

Test plan

9 top-level tests covering the contract surface:

  • TestPollSQSHTFIFOCapability_AllAdvertise — happy path, multiple peers.
  • TestPollSQSHTFIFOCapability_OneMissingFailsClosed — old-binary peer with empty capabilities drops AllAdvertise.
  • TestPollSQSHTFIFOCapability_HTTPErrorFailsClosed — HTTP 500, connection refused, malformed JSON all surface as Error.
  • TestPollSQSHTFIFOCapability_TimeoutFailsClosed — hung peer respects per-peer timeout, full poll bounded.
  • TestPollSQSHTFIFOCapability_EmptyPeersIsVacuouslyTrue — empty peer list contract.
  • TestPollSQSHTFIFOCapability_EmptyPeerAddressFailsClosed"" entry in peers slice surfaces explicit Error.
  • TestPollSQSHTFIFOCapability_FullURLPeerhttp:// and https:// URLs accepted alongside bare host:port.
  • TestPollSQSHTFIFOCapability_ConcurrentPolling — 5×200ms peers finish in well under 1s.
  • TestPollSQSHTFIFOCapability_RespectsBodyLimit — 10 KiB response truncated mid-string surfaces as JSON parse error, not garbage decode.
  • TestBuildSQSHealthURL — URL construction edge cases.
  • go test -race ./adapter/... pass.
  • golangci-lint ./adapter/... clean.

Self-review (per CLAUDE.md)

  1. Data loss — read-only HTTP poll; no FSM/Pebble/retention path. No issue.
  2. Concurrency / distributed failures — peer polls run in independent goroutines; results land via an indexed channel so slice writes are obviously race-free. Per-peer timeout enforced via context.WithTimeout so a slow peer can't stall the rest. Body capped via io.LimitReader. No issue.
  3. Performance — N peers polled concurrently, not serially; the test pins this. Per-peer cost is one HTTP round-trip + a JSON parse of a tiny body. No hot-path impact (CreateQueue is a control-plane operation, not request hot path). No issue.
  4. Data consistency — fail-closed on every failure mode preserves the §8.5 "any peer that doesn't respond is treated as not-yet-upgraded" rule. The vacuously-true empty-peer-list case is documented and the caller's responsibility. No issue.
  5. Test coverage — every documented failure path (HTTP error, transport error, JSON parse, timeout, missing capability, empty peer, body-size cap) is pinned. Concurrent polling is pinned (would have caught a regression to serial). URL construction edges pinned.

Adds a stateless helper that polls each peer's /sqs_health
endpoint and reports whether all advertise the htfifo capability.
This is the building block PR 5's CreateQueue gate uses to refuse
PartitionCount > 1 until every node in the cluster has the
HT-FIFO data plane.

What changes

adapter/sqs_capability_poller.go (new file):
  - HTFIFOCapabilityReport: AllAdvertise + per-peer detail.
  - HTFIFOCapabilityPeerStatus: address, HasHTFIFO flag, raw
    capabilities slice, Error string for failure detail.
  - PollSQSHTFIFOCapability(ctx, client, peers): polls each peer
    concurrently. Returns AllAdvertise=false on any timeout, HTTP
    error, malformed JSON, or missing-capability — fail-closed
    per §8.5.
  - Vacuously AllAdvertise=true on empty peer list (caller is
    responsible for ensuring the peer list is meaningful).
  - Per-peer timeout capped by defaultSQSCapabilityPollTimeout
    (3s) so a single hung peer cannot stall the whole poll.
  - Body capped at 1 KiB via io.LimitReader so a misconfigured
    peer cannot drain memory.
  - Bare host:port and full http://… / https://… URLs both
    accepted — operators can front the endpoint with TLS or a
    proxy without the helper having to know.

What does NOT change yet

- htfifoCapabilityAdvertised stays false. PR 4-B-3b adds the §8
  leadership-refusal hook (startup + per-acquisition observer)
  and flips this flag.
- CreateQueue does NOT yet call PollSQSHTFIFOCapability — PR 5
  lifts the PartitionCount > 1 dormancy gate AND wires the
  capability check in the same commit.

Tests

adapter/sqs_capability_poller_test.go: 9 top-level tests across
the contract surface.
- AllAdvertise happy path with multiple peers.
- One-bad-apple: a peer with empty capabilities drops AllAdvertise.
- Transport failures (HTTP 500, connection refused, malformed
  JSON) all fail closed with non-empty Error.
- Hung peer respects per-peer timeout — test bound is well below
  what a serial poll would take.
- Empty peer list → AllAdvertise vacuously true.
- Empty peer address → fail closed with explicit Error.
- Full-URL peer (http:// or https://) accepted alongside bare
  host:port.
- Concurrent polling: 5 peers each delaying 200ms must finish in
  well under 1 second (serial would take 1s+).
- Body-size limit: a 10 KiB response truncated mid-string is
  surfaced as a JSON parse error, not a half-decoded value.
- TestBuildSQSHealthURL covers the URL construction edge cases.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 30, 2026

Warning

Rate limit exceeded

@bootjp has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 4 minutes and 11 seconds before requesting another review.

To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4ddf5726-2893-459d-b166-d854a09d0db0

📥 Commits

Reviewing files that changed from the base of the PR and between bce448f and 2d9e06d.

📒 Files selected for processing (2)
  • adapter/sqs_capability_poller.go
  • adapter/sqs_capability_poller_test.go
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/sqs-htfifo-capability-poller

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 4 minutes and 11 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 30, 2026

@claude review

Phase 3.D PR 4-B-3a — htfifo capability poller. Pure-additive helper that PR 5's CreateQueue gate will consume; htfifoCapabilityAdvertised stays false (PR 4-B-3b will flip it after the leadership-refusal hook lands). PR description has the full self-review and test plan.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 30, 2026

Claude finished @bootjp's task in 2m 18s —— View job


Code Review: feat(sqs): htfifo capability poller (Phase 3.D PR 4-B-3a)

Tasks:

  • Read changed files
  • Review implementation (adapter/sqs_capability_poller.go)
  • Review tests (adapter/sqs_capability_poller_test.go)
  • Check consistency with existing SQS adapter patterns
  • Post final review

Verdict: Approve with minor notes. The implementation is correct. The concurrency model is clean, the fail-closed contract is consistent, and the test coverage is thorough. A few small issues worth calling out below, none blocking.


Implementation (adapter/sqs_capability_poller.go)

Goroutine + channel model (lines 114–137) — Correct. Buffered channel sized to len(peers) so every goroutine can send without blocking; wg.Wait() before close(results) guarantees the range-drain sees all results. Race-free slice writes via the indexed channel. No goroutine leaks in any path (parent-ctx cancellation or per-peer timeout both return through pollOneSQSPeerForHTFIFO, which sends its result before returning).

Per-peer timeout (line 153)context.WithTimeout(ctx, defaultSQSCapabilityPollTimeout) takes the shorter of the caller's deadline and now+3s, which is the right semantics.

Body cap (line 176) — Consistent with how s3.go:1523 does it (io.LimitReader); sqs_auth.go uses http.MaxBytesReader instead but the effect is the same in this direction (outbound client read vs. inbound server read). Fine as-is.

Error string on fmt.Sprintf vs. errors.Wrapf (lines 172–173, 184–185) — The status.Error field is a string, not a error, so wrapcheck doesn't apply here. The mixed usage (errors.Wrapf(...).Error() for transport errors, fmt.Sprintf for logic errors) is fine.

Minor issue 1 — Non-200 body not drained

// adapter/sqs_capability_poller.go:171-174
if resp.StatusCode != http.StatusOK {
    status.Error = fmt.Sprintf(...)
    return status  // body is closed but not drained
}

resp.Body.Close() without draining prevents the HTTP transport from reusing the TCP connection. In a control-plane path (one call per CreateQueue) this is acceptable, but if the gate ever gets called on a cluster of many nodes under load, this will force connection teardown on every non-200 peer response. Adding _, _ = io.Copy(io.Discard, resp.Body) before the early return is the idiomatic fix. Fix this →

Minor issue 2 — buildSQSHealthURL double-path edge case

// adapter/sqs_capability_poller.go:209-214
func buildSQSHealthURL(peer string) string {
    if strings.HasPrefix(peer, "http://") || strings.HasPrefix(peer, "https://") {
        return strings.TrimRight(peer, "/") + sqsHealthPath
    }
    return "http://" + peer + sqsHealthPath
}

A caller who passes "http://node:5050/sqs_health" (full URL already including the path) would receive "http://node:5050/sqs_health/sqs_health" — the path is doubled. The contract ("pass a base URL") is documented in the comment and the test cases, so this isn't a latent bug in any callers today. But since TestBuildSQSHealthURL is specifically the regression fixture for this function's edge cases, it's worth adding a "http://node:5050/sqs_health" case that documents the behavior (whether you decide to detect+reject it or just document that callers must not do it). Fix this →


Tests (adapter/sqs_capability_poller_test.go)

Coverage is excellent — every documented failure mode is pinned, concurrent polling is timed, URL construction has its own table, body-size cap is independently exercised.

Nit — TestPollSQSHTFIFOCapability_TimeoutFailsClosed tests parent-context cancellation, not the per-peer cap

// adapter/sqs_capability_poller_test.go:147-151
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
report := PollSQSHTFIFOCapability(ctx, nil, []string{addrHang})

The comment says "force the per-peer timeout path quickly", but what this test actually exercises is ctx expiring at 500ms (which is shorter than defaultSQSCapabilityPollTimeout = 3s). The context.WithTimeout(ctx, 3s) inside pollOneSQSPeerForHTFIFO inherits the shorter deadline from the parent, so the request times out via the parent context, not the per-peer cap. The behavior pinned is correct (fail-closed), but the per-peer defaultSQSCapabilityPollTimeout = 3s path is never independently exercised. A complementary test that passes context.Background() (no deadline) and a peer that hangs for 3.5s would close this gap and validate the per-peer cap independently. Not a bug — just a gap in the stated coverage.

Nit — TestPollSQSHTFIFOCapability_OneMissingFailsClosed peer ordering assumption

// adapter/sqs_capability_poller_test.go:73-80
require.True(t, report.Peers[0].HasHTFIFO)   // addrGood
require.False(t, report.Peers[1].HasHTFIFO)  // addrOld

The indexed-channel design (goroutine writes to results at its input index) makes peer-order deterministic in the output slice, so these assertions are correctly testing the indexed aggregation. Calling this out because it's a subtle property worth keeping — if pollOneSQSPeerForHTFIFO is ever inlined into the goroutine using a shared-map instead of the indexed channel, this test would start flapping. The test is a good sentinel for that regression.


Summary

Lens Finding
Data loss None — read-only poll, no FSM/Pebble path
Concurrency Correct — indexed channel avoids slice-write races; per-peer timeout enforced; no goroutine leaks
Performance Concurrent polling confirmed by test; control-plane only, no hot-path impact
Data consistency Fail-closed contract is sound and consistent with §8.5; empty-peer vacuous-true is documented
Test coverage Excellent; two minor gaps noted (per-peer cap not exercised independently; double-path URL edge case not pinned)

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new SQS capability poller that concurrently checks cluster peers for the htfifo capability. The implementation includes structured reporting, per-peer timeouts, and response body size limits to ensure reliability. Feedback was provided regarding the silent ignoring of errors when closing the HTTP response body, suggesting that these should be logged to provide visibility into potential resource cleanup issues.

Comment thread adapter/sqs_capability_poller.go Outdated
status.Error = errors.Wrapf(err, "GET %q", url).Error()
return status
}
defer func() { _ = resp.Body.Close() }()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error from resp.Body.Close() is being silently ignored. According to the general rules, errors from Close() methods on resources like network connections should be logged to ensure resource leaks or other cleanup problems are visible. If a logger is not available in this context, consider how to surface this failure or ensure it is handled according to the project's reliability standards.

References
  1. Do not silently ignore errors from Close() methods on resources like network connections. Log them to ensure resource leaks or other cleanup problems are visible.

PR #721 round 1 review fixes:

1) Gemini medium — Body.Close error silently ignored

  defer func() { _ = resp.Body.Close() }() dropped any close
  error. A failed Close indicates a connection that the
  http.Transport will tear down rather than reuse — under load
  this can mask leaking connections / file descriptors. Logged
  via slog.Warn with the peer address so operators can grep the
  cluster log when triaging.

2) Claude minor — non-200 body not drained

  resp.Body.Close() without first draining the body prevents the
  http.Transport from reusing the underlying TCP connection. In a
  control-plane path (one CreateQueue call per gate check) this
  is acceptable, but if the gate ever fans out across many peers
  under load, the failed-peer branch would force connection
  teardown on every error response. Drain via
  io.Copy(io.Discard, io.LimitReader(resp.Body,
  sqsCapabilityMaxBodyBytes)) before the early return so the
  transport can reuse the connection.

3) Claude nit — per-peer cap not exercised independently

  TestPollSQSHTFIFOCapability_TimeoutFailsClosed used a 500ms
  parent ctx — the request actually timed out via the parent
  context, not the per-peer cap. The default 3s per-peer cap was
  never independently exercised by tests.

  Refactored signature to PollSQSHTFIFOCapability(ctx, peers,
  cfg PollerConfig). PollerConfig{HTTPClient, PerPeerTimeout} is
  the single options surface — zero values pick safe defaults.
  Renamed the existing test to
  TestPollSQSHTFIFOCapability_ParentContextDeadlineFailsClosed
  and added
  TestPollSQSHTFIFOCapability_PerPeerTimeoutFailsClosed which
  uses context.Background() and PerPeerTimeout=100ms to exercise
  the cap independently.

  PerPeerTimeout is also a sensible operator knob — different
  cluster latencies want different bounds. Caller-side: only
  *_test.go files use the function today; PR 5's CreateQueue gate
  will pick the appropriate timeout when it wires this up.

4) Claude minor — buildSQSHealthURL double-path edge case

  A caller passing a URL that already includes the health path
  (e.g. "http://node:5050/sqs_health") would receive a doubled
  path. Added an explicit test case to TestBuildSQSHealthURL
  documenting the behavior and the contract ("pass a base URL or
  host:port, never a full request URL"). A future refactor can
  intentionally change the contract; the test will catch it.

5) Audit per the lessons-learned discipline

  PollSQSHTFIFOCapability is exported but has no production
  callers yet — only the test file references it. grep confirmed
  the API change is safe.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Apr 30, 2026

@claude review

Round 2 — addresses Round 1 review:

1. Gemini medium — Body.Close error silently ignored: now logs via slog.Warn with peer address. A failed Close means the http.Transport will tear down rather than reuse the connection — silently dropping it could mask leaking connections under load.

2. Claude minor — non-200 body not drained: io.Copy(io.Discard, ...) before the early return so the transport can reuse the connection. Drain is wrapped in the same io.LimitReader cap so a misbehaving peer can't drain memory through the discard path.

3. Claude nit — per-peer cap not exercised independently: refactored PollSQSHTFIFOCapability signature to (ctx, peers, cfg PollerConfig) where PollerConfig.PerPeerTimeout is a tunable. Tests now have:

  • TestPollSQSHTFIFOCapability_ParentContextDeadlineFailsClosed (renamed from TimeoutFailsClosed) — pins parent-ctx deadline path
  • TestPollSQSHTFIFOCapability_PerPeerTimeoutFailsClosed (new) — uses context.Background() + PerPeerTimeout=100ms to exercise the cap independently of any parent deadline

Bonus: PerPeerTimeout is also a useful operator knob — different cluster latencies want different bounds. Audit per lessons-learned: grep -rn PollSQSHTFIFOCapability confirmed only the test file references the function — API change is safe.

4. Claude minor — buildSQSHealthURL double-path edge case: added an explicit test case documenting the behavior (caller must not pass a URL that already includes /sqs_health).

go test -race ./adapter/... and golangci-lint clean.

@claude
Copy link
Copy Markdown

claude Bot commented Apr 30, 2026

Claude finished @bootjp's task in 2m 8s —— View job


Code Review: Round 2 — feat(sqs): htfifo capability poller

Tasks:

  • Read changed files
  • Review Round 2 changes against Round 1 feedback
  • Check implementation correctness
  • Post final review

Verdict: LGTM. All four Round 1 items are correctly addressed. No new issues found.


Round 1 feedback checklist

1. Body.Close() error logged (Gemini medium) ✅

// adapter/sqs_capability_poller.go:204-209
defer func() {
    if cerr := resp.Body.Close(); cerr != nil {
        slog.Warn("sqs capability poller: response body close failed",
            "peer", peer, "err", cerr)
    }
}()

Correct. Uses "peer" and "err" keys consistent with sqs_catalog.go and redis_retry.go. The closure captures peer from the enclosing scope correctly. The defer fires after every return path, so all cleanup is centralized.

2. Non-200 body drained before early return (Claude minor) ✅

// adapter/sqs_capability_poller.go:211-215
if resp.StatusCode != http.StatusOK {
    _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, sqsCapabilityMaxBodyBytes))
    status.Error = fmt.Sprintf("%s returned HTTP %d", url, resp.StatusCode)
    return status
}

Correct. The drain is bounded by the same 1 KiB LimitReader as the success path — a misbehaving peer can't exhaust memory through the discard. After the drain, the deferred Close() fires without needing to consume the remainder itself (on a normally-behaved peer, the body is already empty; on an oversized one, the transport tears down the connection). Idiomatic _, _ = for discard.

3. PollerConfig.PerPeerTimeout; per-peer cap exercised independently (Claude nit) ✅

The PollerConfig struct is clean — zero value is safe, all fields optional. The renamed TestPollSQSHTFIFOCapability_ParentContextDeadlineFailsClosed and new TestPollSQSHTFIFOCapability_PerPeerTimeoutFailsClosed now test two distinct code paths:

// Parent-ctx path (line 143): ctx expires at 500ms < 3s per-peer cap
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)

// Per-peer-cap path (line 179-180): context.Background() + 100ms cap
report := PollSQSHTFIFOCapability(context.Background(), []string{addrHang},
    PollerConfig{PerPeerTimeout: 100 * time.Millisecond})

The < 2*time.Second bound for the 100ms cap test is generous enough to avoid CI flakiness but tight enough to catch a regression to unbounded behavior. The boundary condition perPeerTimeout <= 0defaultSQSCapabilityPollTimeout correctly handles the zero value.

4. buildSQSHealthURL double-path documented in test (Claude minor) ✅

// adapter/sqs_capability_poller_test.go:291-297
{
    "http://node.example:5050" + sqsHealthPath,
    "http://node.example:5050" + sqsHealthPath + sqsHealthPath,
},

The comment ("pin the behaviour so a future refactor can either keep it or change the contract intentionally") is the right framing — document and lock rather than silently accept. No caller today passes a full request URL to the poller, so this is a latent trap, not an active bug.


One micro-observation (not blocking)

The slog error key is "err" here (consistent with sqs_catalog.go, redis_retry.go) but redis_delta_compactor.go uses "error". There's a pre-existing inconsistency in the adapter package; this PR is consistent with the SQS-adjacent files, so no change needed here.


Summary

Round 1 item Status
Body.Close() error logged ✅ Addressed
Non-200 body drained ✅ Addressed
Per-peer cap tested independently ✅ Addressed
Double-path URL edge case pinned ✅ Addressed

The concurrency model, fail-closed contract, body-size cap, and all test coverage from Round 1 remain intact. Ready to merge.

@bootjp bootjp merged commit e9f33eb into main Apr 30, 2026
9 checks passed
@bootjp bootjp deleted the feat/sqs-htfifo-capability-poller branch April 30, 2026 11:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant